Skip to content

Conversation

@rdblue
Copy link
Contributor

@rdblue rdblue commented Dec 22, 2023

This refactors the Avro generic reader so that it resolves schemas directly (like PyIceberg) rather than creating an Avro schema to trick Avro's ResolvingDecoder into projecting columns correctly.

This makes the read path easier to maintain because there is no need to hijack and rewrite schemas in ProjectionDatumReader using BuildAvroProjection. This should make it much easier to add default value support.

@github-actions github-actions bot added the core label Dec 22, 2023
/**
* An interface for Avro DatumReaders to support custom record classes.
*/
interface SupportsCustomRecords {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is internal for now. I'm not sure that we want to expose it more broadly yet.

}

public abstract static class PlannedStructReader<S>
implements ValueReader<S>, SupportsRowPosition {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new base class that implements a read plan, which is a list of positions and readers. The read plan is produced by the new logic in GenericAvroReader that handles structs.

PositionReader(long rowPosition) {
this.currentPosition = rowPosition - 1;
}
static class PositionReader implements ValueReader<Long>, SupportsRowPosition {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refactor includes changes to make PositionReader easier to use. Rather than needing to instantiate it inside of setRowPositionSupplier, it now just implements SupportsRowPosition. The logic for creating a position reader is now part of the object model rather than part of StructReader.

.as("Field missing from table mapping is renamed")
.isNotNull();
Assertions.assertThat(projected.get("location_r5"))
Assertions.assertThat(projected.get("location"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These test updates fix the odd behavior caused by rewriting the read schema. Name mapping previously needed to produce fields with incorrect names in order to project fields but not have them read (by name) from the data file.

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few questions, but looks good! Excited to see the skipping in there as well. When we write the sizes for arrays/maps then this should also speed up reading quite a bit.

this.nameMapping = MappingUtil.create(schema);
}

DatumReader<D> reader;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always like to make these final so you're sure that it doesn't skip through a branch.

Suggested change
DatumReader<D> reader;
final DatumReader<D> reader;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is necessary in this case. The compiler will catch if it is unset because no default was provided.

((SupportsRowPosition) reader)
.setRowPositionSupplier(() -> AvroIO.findStartingRowPos(file::newStream, start));
.setRowPositionSupplier(
Suppliers.memoize(() -> AvroIO.findStartingRowPos(file::newStream, start)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the memoize? Are we reading the same file multiple times?

Copy link
Contributor Author

@rdblue rdblue Jan 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, this was being done in the StructValueReader. If the struct reader inserted a PositionReader, it would also rewrite the position supplier.

That was a lot of complication for the value reader and didn't work in all cases (for example, if two structs had position columns) so I moved the memoization here. It's simpler that way and enabled us to add position readers that are constructed in the same place as the other readers, instead of needing to keep track of the position index in a struct and inject when setRowPositionSupplier is called.

}
}

private static class RequiredOptionReader implements ValueReader<Object> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this next to the UnionReader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually unused so we could remove it. The purpose is to be able to replace the union reader with one that checks that the value is non-null for cases where the file has an optional field but the expected schema requires it.

With Iceberg's schema evolution rules, we should never have that case, which is why I didn't end up using this (it was complicated and of little value). But I included the class just in case we want it in the future.

It would be good to hear what you think. Should we keep or remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you've approved this and I don't see any other required changes, I'm going to remove this to unblock getting this commit in.

Copy link
Contributor

@Fokko Fokko Jan 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the context. I would remove it, the PR looks good, so feel free to merge it once the tests are green 👍

Types.NestedField field = expected.field(fieldId);
if (constant != null) {
readPlan.add(Pair.of(pos, ValueReaders.constant(constant)));
} else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to codify these cases? They should just follow the Iceberg spec like any other Avro file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this is better than how we did it before.

Previously, we would inject these fields in the StructReader, but there improvements that we can make to that approach:

  1. It isn't the struct reader's responsibility to skip or change the readers that are passed to it. Read "planning" should be done here, where the read and file schemas are both present.
  2. It wasn't clear what readers should be passed in or produced here, given that readers might be replaced
  3. This unifies how new optional fields are handled with how row position and metadata fields are handled. It also sets up future default value handling using a constant reader, which is one of the reasons for making these changes.

@rdblue rdblue merged commit 604422b into apache:main Jan 1, 2024
@rdblue
Copy link
Contributor Author

rdblue commented Jan 1, 2024

Thanks for reviewing, @Fokko!

lisirrx pushed a commit to lisirrx/iceberg that referenced this pull request Jan 4, 2024
Object constant = idToConstant.get(fieldId);
Types.NestedField field = expected.field(fieldId);
if (constant != null) {
readPlan.add(Pair.of(pos, ValueReaders.constant(constant)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we need something along the lines of GenericAvroConstantReader that returns the constant in the Avro GenericData.Record format. Right now the ConstantReader class returns the constant object as is. Most of the time this constant is an Iceberg data constant, but what we need here is an Avro GenericData.Record. We can extend ConstantReader<T> here, but is is a private class. Can we promote it to public?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants